package com.remoter.transport.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;

import com.google.common.util.concurrent.SettableFuture;
import com.remoter.api.consumer.IConsumerService;
import com.remoter.api.exception.ClientUnAvailableException;
import com.remoter.api.exception.ConnectErrorException;
import com.remoter.api.exception.ReadTimeoutException;
import com.remoter.api.exception.WriteTimeoutException;
import com.remoter.api.extension.annotation.ExtensionName;
import com.remoter.api.extension.support.ExtensionLoader;
import com.remoter.api.protocol.support.RemoterRequest;
import com.remoter.api.protocol.support.RemoterResponse;
import com.remoter.api.transport.support.AbstractClient;
import com.remoter.api.util.Final;
import com.remoter.transport.netty.codec.MessageDecoder;
import com.remoter.transport.netty.codec.MessageEncoder;
import com.remoter.transport.netty.handler.ClientMessageHandler;
import com.remoter.transport.netty.util.FinalTransportNetty;

@ExtensionName("netty")
public class NettyClient extends AbstractClient{
	
	protected final Integer connectTimeout;
	protected final Integer writeTimeout;
	protected final Integer readTimeout;
	protected final NioEventLoopGroup work;
	protected final IConsumerService consumerService;
	
	protected ClientMessageHandler messageHandler;
	protected InetSocketAddress remote;
	protected InetSocketAddress local;
	
	public NettyClient(){
		this.consumerService = ExtensionLoader.getService(IConsumerService.class,this.configuration.getOption(Final.O_SERVER_CONSUMER));
		this.work = new NioEventLoopGroup();
		this.connectTimeout = this.configuration.getOption(FinalTransportNetty.O_TRANSPORT_CLIENT_CONNECT_TIMEOUT);
		this.writeTimeout = this.configuration.getOption(FinalTransportNetty.O_TRANSPORT_CLIENT_WRITE_TIMEOUT);
		this.readTimeout = this.configuration.getOption(FinalTransportNetty.O_TRANSPORT_CLIENT_READ_TIMEOUT);
	}

	@Override
	public RemoterResponse sendRemoterRequest(RemoterRequest remoterRequest) throws ClientUnAvailableException,ReadTimeoutException,WriteTimeoutException {
		if(!this.isAvailable()){
			throw ClientUnAvailableException.INSTANCE;
		}
		try{
			SettableFuture<RemoterResponse> future = this.messageHandler.sendRemoterRequest(this.writeTimeout,remoterRequest);
			return future.get(this.readTimeout,TimeUnit.MILLISECONDS);
		}catch(Exception e){
			throw ReadTimeoutException.INSTANCE;
		}
	}
	
	@Override
	public void connect(final InetSocketAddress remote)throws ConnectErrorException{
		if(this.isAvailable()){
			return;
		}
		if(null == remote){
			throw new IllegalArgumentException("remote address error");
		}
		this.remote = remote;
		Bootstrap bootstrap = new Bootstrap();
		bootstrap.group(this.work);
		bootstrap.channel(NioSocketChannel.class);
		bootstrap.option(ChannelOption.SO_KEEPALIVE,true);
		bootstrap.option(ChannelOption.ALLOCATOR,PooledByteBufAllocator.DEFAULT);
		bootstrap.handler(new ChannelInitializer<SocketChannel>(){
			@Override
			protected void initChannel(SocketChannel socketChannel) throws Exception {
				ChannelPipeline channelPipeline = socketChannel.pipeline();
				channelPipeline.addLast(new MessageDecoder(serialization));
				channelPipeline.addLast(new MessageEncoder(serialization));
				channelPipeline.addLast(new ClientMessageHandler(consumerService));
			}
		});
		logger.info("begin connect :" + this.remote);
		ChannelFuture channelFuture = bootstrap.connect(this.remote);
		boolean result = channelFuture.awaitUninterruptibly(this.connectTimeout,TimeUnit.MILLISECONDS);
		if(result && channelFuture.isSuccess()){
			this.local = (InetSocketAddress)channelFuture.channel().localAddress();
			this.messageHandler = channelFuture.channel().pipeline().get(ClientMessageHandler.class);
			channelFuture.channel().closeFuture().addListener(new ChannelFutureListener(){
				@Override
				public void operationComplete(ChannelFuture future) throws Exception {
					logger.debug("provider offline , disconnect " + remote);
					consumerService.disConnect(remote);
				}
			});
		}else{
			throw new ConnectErrorException("begin connect :" + this.remote,channelFuture.cause());
		}
	}
	
	@Override
	public void disConnect(){
		if(this.isAvailable()){
			this.messageHandler.close();
		}
		if(null != this.work){
			this.work.shutdownGracefully();
		}
	}

	@Override
	public boolean isAvailable(){
		if(null != this.messageHandler && this.messageHandler.isAvailable()){
			return true;
		}
		return false;
	}

	@Override
	public InetSocketAddress getLocalAddress() {
		return this.local;
	}
	
}